package pkg

import (
	"fmt"
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"time"
)

type AmqpConnectOption struct {
	Host     string
	Port     int
	Vhost    string
	Username string
	Password string
}

type AmqpConnection struct {
	logID          string
	option         AmqpConnectOption
	realConnection *amqp.Connection
	// 底层是否连接成功
	connected bool
	// 连接是否已经关闭
	closed bool
}

func (c *AmqpConnection) connect() {
	// 连接未关闭时执行连接
	for !c.closed && !c.connected {
		realConnection, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d%s",
			c.option.Username, c.option.Password, c.option.Host, c.option.Port, c.option.Vhost))
		if nil != err {
			log.Printf("%s amqp 获取connection 异常 %s\n", c.logID, err.Error())
			time.Sleep(time.Millisecond * 500)
		} else {
			c.realConnection = realConnection
			c.connected = true
			log.Printf("%s amqp 获取connection 成功\n", c.logID)
		}
	}
}

func (c *AmqpConnection) monitorConnect() {
	for !c.closed {
		closeChan := make(chan *amqp.Error)
		c.realConnection.NotifyClose(closeChan)
		err := <-closeChan
		errMsg := ""
		if nil != err {
			errMsg = err.Error()
		}
		log.Printf("%s amqp connection 断开 %s\n", c.logID, errMsg)
		c.connected = false
		if !c.closed {
			log.Printf("%s amqp connection 断开,执行重连\n", c.logID)
			// 连接断开,执行重连
			c.connect()
		}
	}
	log.Printf("%s amqp connection monitor 停止\n", c.logID)
}

func (c *AmqpConnection) getRealChannel() *amqp.Channel {
	for !c.closed {
		if !c.connected {
			// 等待connection连接成功
			time.Sleep(time.Millisecond * 500)
			continue
		}
		channel, err := c.realConnection.Channel()
		if nil != err {
			log.Printf("%s amqp 获取channel 异常 %s\n", c.logID, err.Error())
			time.Sleep(time.Millisecond * 500)
			continue
		}
		log.Printf("%s amqp 获取channel 成功\n", c.logID)
		return channel
	}
	return nil
}

func (c *AmqpConnection) Disconnect() {
	c.closed = true
	if nil != c.realConnection {
		_ = c.realConnection.Close()
	}
	c.connected = false
}

func NewAmqpConnection(option AmqpConnectOption, logID string) *AmqpConnection {
	if "" == option.Vhost {
		option.Vhost = "/"
	}
	if "/" != option.Vhost {
		option.Vhost = "/" + option.Vhost
	}
	connection := AmqpConnection{
		logID:     logID,
		option:    option,
		connected: false,
		closed:    false,
	}
	connection.connect()
	go connection.monitorConnect()
	return &connection
}
